#include "tbcore/db/rocksdb.hpp"

#include <rocksdb/c.h>

#include "tbcore/archive/archive.hpp"

TB_NAMESPACE_BEGIN

void __ComparatorDestructor(void*) {

}

int __ComparatorCompare(void*, const char* a, size_t alen, const char* b, size_t blen) {
  int ret = ::memcmp(a, b, std::min(alen, blen));
  if (ret == 0) {
    return (int)(alen - blen);
  } else {
    return ret;
  }
}

const char* __ComparatorName(void*) {
  return "TB_key_comparator";
}

struct ExtractorContext {
  _STD string prefix;
};

void __ExtractorDestructor(void* state) {
  delete (ExtractorContext*)state;
}

char* __ExtractorTransform(void* state, const char* key, size_t length, size_t* dst_length) {
  (void)state;
  *dst_length = length;
  return const_cast<char*>(key);
}

unsigned char __ExtractorInDomain(void*, const char* key, size_t length) {
  return 1;
}

unsigned char __ExtractorInRange(void* state, const char* key, size_t length) {
  ExtractorContext* ctx = (ExtractorContext*)state;
  if (ctx->prefix.empty()) {
    return 1;
  }
  if (length < ctx->prefix.size()) {
    return 0;
  }
  return memcmp(ctx->prefix.data(), key, length) == 0 ? 1 : 0;
}

const char* __ExtractorName(void*) {
  return "TB_prefix_extractor";
}

bool BlobKeyPred(const Blob& key, const Blob& min, const Blob& max) {
  if (key.size() == min.size() && key.size() == max.size()) {
    return memcmp(key.data(), min.data(), key.size()) >= 0 &&
      memcmp(key.data(), max.data(), key.size()) <= 0;
  }
  return false;
}

Result MakeOpResult(char* err, ErrorCode code) {
	return err == nullptr ? Result() :
		Result(code, TB_STRING(std::string(err)));
}

struct RocksdbIterator : public DBiterator {
  RocksdbIterator(rocksdb_readoptions_t* roption, rocksdb_iterator_t* readiter, 
    DBInterface::RangeProcT pred, const Blob& minkey, const Blob& maxkey) 
    : readoption(roption), readiter_(readiter), 
      pred_(pred), minkey_(minkey), maxkey_(maxkey) {}

	~RocksdbIterator() {
		if (readiter_) rocksdb_iter_destroy(readiter_);
    if (readoption) rocksdb_readoptions_destroy(readoption);
	}

	Result Next(const char** key, size_t& keylen, const char** val, size_t& vallen) {
		char* err = nullptr;
    if (rocksdb_iter_valid(readiter_)) {
      rocksdb_iter_get_error(readiter_, &err);
      if (err == nullptr) {
        *key = rocksdb_iter_key(readiter_, &keylen);
        if (pred_ && !pred_(Blob(*key, (uint32)keylen), minkey_, maxkey_)) {
          return Result(kDatabaseIteratorEnd);
        }
        *val = rocksdb_iter_value(readiter_, &vallen);
        rocksdb_iter_next(readiter_);
        return Result();
      }
    }
		return MakeOpResult(err, kReadDatabaseFailed);
	}

	rocksdb_iterator_t* readiter_;
  rocksdb_readoptions_t* readoption;
	DBInterface::RangeProcT pred_;
	Blob minkey_;
	Blob maxkey_;
};

struct RocksDB::RocksDBImpl {
	RocksDBImpl() : db_(nullptr) {}
	rocksdb_t* db_;
	String dbname_;
  ExtractorContext* extctx_;
};

RocksDB::RocksDB() : impl_(new RocksDBImpl()) {

}

RocksDB::~RocksDB() {
	delete impl_;
}

Result RocksDB::OpenDB(const String& param, bool createIfNotExist, 
	bool readOnly) {
	if (impl_->db_) {
		LWARNING() << _T("database") << param << _T("already opened");
		return Result();
	}

	rocksdb_options_t* option = rocksdb_options_create();
  rocksdb_options_set_use_fsync(option, 1);
  rocksdb_options_set_write_buffer_size(option, 1024 * 1024 * 1024);

//   rocksdb_comparator_t* comparator = rocksdb_comparator_create(
//     nullptr, __ComparatorDestructor, __ComparatorCompare, __ComparatorName);
//   rocksdb_options_set_comparator(option, comparator);

  impl_->extctx_ = new ExtractorContext;
//   rocksdb_slicetransform_t* prefixExtractor =
//     rocksdb_slicetransform_create(impl_->extctx_, __ExtractorDestructor,
//     __ExtractorTransform, __ExtractorInDomain,
//     __ExtractorInRange, __ExtractorName);
//   rocksdb_options_set_prefix_extractor(option, prefixExtractor);

	if (createIfNotExist) {
		rocksdb_options_set_create_if_missing(option, 1);
	}

	char* err = nullptr;
	_STD string name = TB_ASCII_STRING(param);
	if (readOnly) {
		impl_->db_ = rocksdb_open_for_read_only(option, name.c_str(), 0, &err);
	} else {
		impl_->db_ = rocksdb_open(option, name.c_str(), &err);
	}

	rocksdb_options_destroy(option);
	
	if (err == nullptr) {
		impl_->dbname_ = param;
		return Result();
	} else {
		return Result(kOpenDatabaseFailed, TB_STRING(std::string(err)));
	}
}

Result RocksDB::CloseDB() {
	if (impl_->db_) {
		rocksdb_close(impl_->db_);
		impl_->db_ = nullptr;
	}
	return Result();
}

Result RocksDB::DestroyDB(const String& param) {
	rocksdb_options_t* option = rocksdb_options_create();
	char* err = nullptr;
	_STD string name = TB_ASCII_STRING(param);
	rocksdb_destroy_db(option, name.c_str(), &err);
	rocksdb_options_destroy(option);

	impl_->dbname_.clear();

	return MakeOpResult(err, kDestroyDatabaseFailed);
}

Result RocksDB::Write(const Blob& key, const char* val, uint32 vallen) {
	if (!impl_->db_) {
		return Result(kInvalidDatebase);
	}

	char* err = nullptr;
	rocksdb_writeoptions_t* woption = rocksdb_writeoptions_create();
	rocksdb_put(impl_->db_, woption, (const char*)key.data(), key.size(), val, vallen, &err);
	rocksdb_writeoptions_destroy(woption);
	return MakeOpResult(err, kWriteDatabaseFailed);
}

Result RocksDB::WriteValue(const Blob& key, const Variant& val) {
	if (!impl_->db_) {
		return Result(kInvalidDatebase);
	}

	MemoryBuffer buff;
	SerVariantAsBinary(val, buff);
	Blob serData = buff.ReadBytes(buff.Size());
	return Write(key, (const char*)serData.data(), serData.size());
}

Result RocksDB::WriteBatch(const Blob* key, const Variant* vals, uint32 len) {
	if (!impl_->db_) {
		return Result(kInvalidDatebase);
	}

	char* err = nullptr;
	MemoryBuffer buff;

	rocksdb_writebatch_t* batch = rocksdb_writebatch_create();

	for (uint32 i = 0; i < len; ++i) {
		buff.Reset();
		SerVariantAsBinary(vals[i], buff);
		Blob serData = buff.ReadBytes(buff.Size());
		rocksdb_writebatch_put(batch, (const char*)key[i].data(), key[i].size(),
			(const char*)serData.data(), serData.size());
	}
		
	rocksdb_writeoptions_t* woption = rocksdb_writeoptions_create();
	rocksdb_write(impl_->db_, woption, batch, &err);
	rocksdb_writeoptions_destroy(woption);

	rocksdb_writebatch_destroy(batch);

	return MakeOpResult(err, kWriteDatabaseFailed);
}

void RocksDB::Flush() {
	if (impl_->db_) {
		rocksdb_flushoptions_t* option = rocksdb_flushoptions_create();
		char* err = nullptr;
		rocksdb_flush(impl_->db_, option, &err);
		if (err) {
			_LERROR() << "error occured when flush the db, detail: " << err;
		}
		rocksdb_flushoptions_destroy(option);
	}
}

Result RocksDB::Read(const Blob& key, char** val, uint32& vallen) {
	if (!impl_->db_) {
		return Result(kInvalidDatebase);
	}

	char* err = nullptr;
	size_t size;;
	rocksdb_readoptions_t* roption = rocksdb_readoptions_create();
	*val = rocksdb_get(impl_->db_, roption, (const char*)key.data(), key.size(), &size, &err);
	rocksdb_readoptions_destroy(roption);
	vallen = (uint32)size;
	return MakeOpResult(err, kReadDatabaseFailed);
}

Result RocksDB::ReadValue(const Blob& key, Variant& val) {
	if (!impl_->db_) {
		return Result(kInvalidDatebase);
	}

	char* data;
	uint32 len;
	Result ret = Read(key, &data, len);
	if (ret) {
		DeserBinaryAsVariant(data, len, val);
		if (!val.IsNull()) {
			return Result();
		}
	}
	return ret;
}

DBIteratorPtr RocksDB::ReadFilter(DBFilterInterface* filter) {
	if (!impl_->db_) {
		return DBIteratorPtr();
	}
	//to be implemented
	return DBIteratorPtr();
}

Result RocksDB::ReadBatch(const Blob* keys, uint32 keylen, 
	char** vallist, size_t& valsizelist) {
	if (!impl_->db_) {
		return Result(kInvalidDatebase);
	}

	char* keyslist;
	size_t* keysizelist = (size_t*)TB_MALLOC(keylen * sizeof(size_t));
	size_t keysmemlen = 0;
	for (uint32 i = 0; i < keylen; ++i) {
		*(keysizelist + i) = keys[i].size();
		keysmemlen += *(keysizelist + i);
	}
	keyslist = (char*)TB_MALLOC(keysmemlen * sizeof(char));
	uint32 accum = 0;
	for (uint32 i = 0; i < keylen; ++i) {
		memcpy(keyslist + accum, keys[i].data(), keys[i].size());
		accum += keys[i].size();
	}
	char* err = nullptr;
	rocksdb_readoptions_t* roption = rocksdb_readoptions_create();
	rocksdb_multi_get(impl_->db_, roption, keylen, &keyslist, keysizelist, vallist, &valsizelist, &err);
	rocksdb_readoptions_destroy(roption);
	return MakeOpResult(err, kReadDatabaseFailed);
}

DBIteratorPtr RocksDB::ReadByRange(const Blob& minkey, const Blob& maxkey,
  const _STD string& prefix, RangeProcT pred) {
	if (!impl_->db_) {
		DBIteratorPtr();
	}

  impl_->extctx_->prefix = prefix;

  //delay option deletion
	rocksdb_readoptions_t* roption = rocksdb_readoptions_create();
  rocksdb_readoptions_set_iterate_upper_bound(roption, maxkey.content(), maxkey.size());
	rocksdb_iterator_t* readitor = rocksdb_create_iterator(impl_->db_, roption);
  rocksdb_iter_seek(readitor, minkey.content(), minkey.size());
  return make_shared<RocksdbIterator>(roption, readitor, pred, minkey.copy(), maxkey.copy());
}

Result RocksDB::Delete(const Blob& key) {
	if (!impl_->db_) {
		return Result(kInvalidDatebase);
	}

	char* err = nullptr;
	rocksdb_writeoptions_t* woption = rocksdb_writeoptions_create();
	rocksdb_delete(impl_->db_, woption, (const char*)key.data(), key.size(), &err);
	rocksdb_writeoptions_destroy(woption);
	return MakeOpResult(err, kWriteDatabaseFailed);
}

Result RocksDB::DeleteFilter(DBFilterInterface* filter, bool onlyOnce) {
	if (!impl_->db_) {
		return Result(kInvalidDatebase);
	}

	//to be implemented
	return Result();
}

Result RocksDB::DeleteBatch(const Blob* keys, uint32 keylen) {
	if (!impl_->db_) {
		return Result(kInvalidDatebase);
	}

	rocksdb_writebatch_t* batch = rocksdb_writebatch_create();
	for (uint32 i = 0; i < keylen; ++i) {
		rocksdb_writebatch_delete(batch, (const char*)keys[i].data(), (size_t)keys[i].size());
	}
	rocksdb_writebatch_destroy(batch);
	return MakeOpResult(nullptr, kWriteDatabaseFailed);
}

Result RocksDB::DeleteByRange(const Blob& minkey, const Blob& maxkey, 
RangeProcT pred) {
	if (!impl_->db_) {
		return Result(kInvalidDatebase);
	}

	//to be implemented
	return Result();
}

TB_NAMESPACE_END